{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Exercise 2 - Parallel Data Processing with Task Dependencies\n",
    "\n",
    "**GOAL:** The goal of this exercise is to show how to pass object IDs into remote functions to encode dependencies between tasks.\n",
    "\n",
    "In this exercise, we construct a sequence of tasks each of which depends on the previous mimicking a data parallel application. Within each sequence, tasks are executed serially, but multiple sequences can be executed in parallel.\n",
    "\n",
    "In this exercise, you will use Ray to parallelize the computation below and speed it up.\n",
    "\n",
    "### Concept for this Exercise - Task Dependencies\n",
    "\n",
    "Suppose we have two remote functions defined as follows.\n",
    "\n",
    "```python\n",
    "@ray.remote\n",
    "def f(x):\n",
    "    return x\n",
    "```\n",
    "\n",
    "Arguments can be passed into remote functions as usual.\n",
    "\n",
    "```python\n",
    ">>> x1_id = f.remote(1)\n",
    ">>> ray.get(x1_id)\n",
    "1\n",
    "\n",
    ">>> x2_id = f.remote([1, 2, 3])\n",
    ">>> ray.get(x2_id)\n",
    "[1, 2, 3]\n",
    "```\n",
    "\n",
    "**Object IDs** can also be passed into remote functions. When the function actually gets executed, **the argument will be a retrieved as a regular Python object**.\n",
    "\n",
    "```python\n",
    ">>> y1_id = f.remote(x1_id)\n",
    ">>> ray.get(y1_id)\n",
    "1\n",
    "\n",
    ">>> y2_id = f.remote(x2_id)\n",
    ">>> ray.get(y2_id)\n",
    "[1, 2, 3]\n",
    "```\n",
    "\n",
    "So when implementing a remote function, the function should expect a regular Python object regardless of whether the caller passes in a regular Python object or an object ID.\n",
    "\n",
    "**Task dependencies affect scheduling.** In the example above, the task that creates `y1_id` depends on the task that creates `x1_id`. This has the following implications.\n",
    "\n",
    "- The second task will not be executed until the first task has finished executing.\n",
    "- If the two tasks are scheduled on different machines, the output of the first task (the value corresponding to `x1_id`) will be copied over the network to the machine where the second task is scheduled."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from __future__ import absolute_import\n",
    "from __future__ import division\n",
    "from __future__ import print_function\n",
    "\n",
    "import numpy as np\n",
    "import ray\n",
    "import time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ray.init(num_cpus=4, include_webui=False, ignore_reinit_error=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "These are some helper functions that mimic an example pattern of a data parallel application.\n",
    "\n",
    "**EXERCISE:** You will need to turn all of these functions into remote functions. When you turn these functions into remote function, you do not have to worry about whether the caller passes in an object ID or a regular object. In both cases, the arguments will be regular objects when the function executes. This means that even if you pass in an object ID, you **do not need to call `ray.get`** inside of these remote functions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def load_data(filename):\n",
    "    time.sleep(0.1)\n",
    "    return np.ones((1000, 100))\n",
    "\n",
    "def normalize_data(data):\n",
    "    time.sleep(0.1)\n",
    "    return data - np.mean(data, axis=0)\n",
    "\n",
    "def extract_features(normalized_data):\n",
    "    time.sleep(0.1)\n",
    "    return np.hstack([normalized_data, normalized_data ** 2])\n",
    "\n",
    "def compute_loss(features):\n",
    "    num_data, dim = features.shape\n",
    "    time.sleep(0.1)\n",
    "    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)\n",
    "\n",
    "assert hasattr(load_data, 'remote'), 'load_data must be a remote function'\n",
    "assert hasattr(normalize_data, 'remote'), 'normalize_data must be a remote function'\n",
    "assert hasattr(extract_features, 'remote'), 'extract_features must be a remote function'\n",
    "assert hasattr(compute_loss, 'remote'), 'compute_loss must be a remote function'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** The loop below takes too long. Parallelize the four passes through the loop by turning `load_data`, `normalize_data`, `extract_features`, and `compute_loss` into remote functions and then retrieving the losses with `ray.get`.\n",
    "\n",
    "**NOTE:** You should only use **ONE** call to `ray.get`. For example, the object ID returned by `load_data` should be passed directly into `normalize_data` without needing to be retrieved by the driver."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Sleep a little to improve the accuracy of the timing measurements below.\n",
    "time.sleep(2.0)\n",
    "start_time = time.time()\n",
    "\n",
    "losses = []\n",
    "for filename in ['file1', 'file2', 'file3', 'file4']:\n",
    "    inner_start = time.time()\n",
    "\n",
    "    data = load_data(filename)\n",
    "    normalized_data = normalize_data(data)\n",
    "    features = extract_features(normalized_data)\n",
    "    loss = compute_loss(features)\n",
    "    losses.append(loss)\n",
    "    \n",
    "    inner_end = time.time()\n",
    "    \n",
    "    if inner_end - inner_start >= 0.1:\n",
    "        raise Exception('You may be calling ray.get inside of the for loop! '\n",
    "                        'Doing this will prevent parallelism from being exposed. '\n",
    "                        'Make sure to only call ray.get once outside of the for loop.')\n",
    "\n",
    "print('The losses are {}.'.format(losses) + '\\n')\n",
    "loss = sum(losses)\n",
    "\n",
    "end_time = time.time()\n",
    "duration = end_time - start_time\n",
    "\n",
    "print('The loss is {}. This took {} seconds. Run the next cell to see '\n",
    "      'if the exercise was done correctly.'.format(loss, duration))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "assert loss == 4000\n",
    "assert duration < 0.8, ('The loop took {} seconds. This is too slow.'\n",
    "                        .format(duration))\n",
    "assert duration > 0.4, ('The loop took {} seconds. This is too fast.'\n",
    "                        .format(duration))\n",
    "\n",
    "print('Success! The example took {} seconds.'.format(duration))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Use the UI to view the task timeline and to verify that the four tasks were executed in parallel. You can do this as follows.\n",
    "\n",
    "1. Run the following cell to generate a JSON file containing the profiling data.\n",
    "2. Download the timeline file by right clicking on `timeline02.json` in the navigator to the left and choosing **\"Download\"**.\n",
    "3. Open [chrome://tracing/](chrome://tracing/) in the Chrome web browser, click on the **\"Load\"** button and load the downloaded JSON file.\n",
    "\n",
    "To navigate within the timeline, do the following.\n",
    "- Move around by clicking and dragging.\n",
    "- Zoom in and out by holding **alt** and scrolling.\n",
    "\n",
    "**NOTE:** The timeline visualization will only work in **Chrome**."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ray.timeline(filename=\"timeline02.json\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Application: Parallel web-scraping\n",
    "\n",
    "One useful application of what we have learned so far is to scrape information from the web. We will illustrate this in a toy setting, but the same principles apply on a large scale where crawling through websites, parsing them and extracting useful content (e.g. for building a search index or populating a database) is often very computationally demanding.\n",
    "\n",
    "We break up the process into multiple steps. We first grab the raw HTML of the website using Python's requests package. Then, we use BeautifulSoup to parse the HTML to find the relevant information. Finally, we populate a pandas DataFrames so that we are able to work with the data.\n",
    "\n",
    "To demonstrate this, we scrape GitHub commits to see the latest commits on several repositories."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install requests"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from bs4 import BeautifulSoup\n",
    "import requests\n",
    "\n",
    "import pandas as pd"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following function uses these libraries to parse the latest commits from several repositories on GitHub."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def fetch_commits(repo):\n",
    "    url = 'https://github.com/{}/commits/master'.format(repo)\n",
    "    response = requests.get(url)\n",
    "    soup = BeautifulSoup(response.text, 'lxml')\n",
    "    results = []\n",
    "    for commit_elt in soup.find_all('li', class_='commit'):\n",
    "        title = commit_elt.find_all('a', class_='message')[0].attrs.get('aria-label').split('\\n')[0]\n",
    "        link_elts = commit_elt.find_all('a', class_='issue-link')\n",
    "        link = None\n",
    "        for le in link_elts:\n",
    "            if 'issue' in le.attrs['href'].lower():\n",
    "                link = le.attrs['href']\n",
    "        results.append(dict(title=title, link=link))\n",
    "\n",
    "    df = pd.DataFrame(results)\n",
    "    df['repository'] = repo\n",
    "    return df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's try this out to get results for some ray related topics serially."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "start = time.time()\n",
    "repos = [\"ray-project/ray\", \"modin-project/modin\", \"tensorflow/tensorflow\", \"apache/arrow\"]\n",
    "results = []\n",
    "for repo in repos:\n",
    "    df = fetch_commits(repo)\n",
    "    results.append(df)\n",
    "    \n",
    "df = pd.concat(results, sort=False)\n",
    "duration = time.time() - start\n",
    "print(\"Constructing the dataframe took {} seconds.\".format(duration))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE**: Speed up the above serial query by making `fetch_commits` a remote function in order to scrape GitHub results in parallel. Then, see a sample of the data scraped below and feel free to play with the data to find other resources to learn more about these libraries!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.7"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": false,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": true,
   "toc_window_display": true
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}